[Kafka]02 Kafka入门 二

关键术语、工作原理和过程

Posted by 李玉坤 on 2017-11-14

Kafka 中的关键术语

Topic:主题。在 Kafka 中,使用一个类别属性来划分消息的所属类,划分消息的这个类称为 Topic。Topic 相当于消息的分类标签,是一个逻辑概念。

物理上不同 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 Broker 上但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处。

Partition:分区。Topic 中的消息被分割为一个或多个 Partition,其是一个物理概念,对应到系统上 就是一个或若干个目录。Partition 内部的消息是有序的,但 Partition 间的消息是无序的。

Segment 段。将 Partition 进一步细分为了若干的 Segment,每个 Segment 文件的大小相等。

Broker:Kafka 集群包含一个或多个服务器,每个服务器节点称为一个 Broker。

Broker 存储 Topic 的数据。如果某 Topic 有 N 个 Partition,集群有 N 个 Broker,那么每个 Broker 存储该 Topic 的一个 Partition。

如果某 Topic 有 N 个 Partition,集群有(N+M)个 Broker,那么其中有 N 个 Broker 存储该 Topic 的一个 Partition,剩下的 M 个 Broker 不存储该 Topic 的 Partition 数据。

如果某 Topic 有 N 个 Partition,集群中 Broker 数目少于 N 个,那么一个 Broker 存储该 Topic 的一个或多个 Partition。

在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致 Kafka 集群数据不均衡。

Producer:生产者。即消息的发布者,生产者将数据发布到他们选择的主题。

生产者负责选择将哪个记录分配给主题中的哪个分区。即:生产者生产的一条消息,会被写入到某一个 Partition。

Consumer:消费者。可以从 Broker 中读取消息。一个消费者可以消费多个 Topic 的消息;一个消费者可以消费同一个 Topic 中的多个 Partition 中的消息;一个 Partiton 允许多个 Consumer 同时消费。

Consumer Group:Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。

组内可以有多个消费者,它们共享一个公共的 ID,即 Group ID。组内的所有消费者协调在一起来消费订阅主题 的所有分区。

Kafka 保证同一个 Consumer Group 中只有一个 Consumer 会消费某条消息。

实际上,Kafka 保证的是稳定状态下每一个 Consumer 实例只会消费某一个或多个特定的 Partition,而某个 Partition 的数据只会被某一个特定的 Consumer 实例所消费。

下面用官网的一张图, 来标识 Consumer 数量和 Partition 数量的对应关系。

由两台服务器组成的 Kafka 群集,其中包含四个带有两个使用者组的分区(P0-P3)。消费者组 A 有两个消费者实例,组 B 有四个。

对于这个消费组, 总结:Topic 中的 Partitoin 到 Group 是发布订阅的通信方式。

即一条 Topic 的 Partition 的消息会被所有的 Group 消费,属于一对多模式;Group 到 Consumer 是点对点通信方式,属于一对一模式。

举个例子:不使用 Group 的话,启动 10 个 Consumer 消费一个 Topic,这 10 个 Consumer 都能得到 Topic 的所有数据,相当于这个 Topic 中的任一条消息被消费 10 次。

使用 Group 的话,连接时带上 groupid,Topic 的消息会分发到 10 个 Consumer 上,每条消息只被消费 1 次。

Replizcas of partition:分区副本。副本是一个分区的备份,是为了防止消息丢失而创建的分区的备份。

Partition Leader:每个 Partition 有多个副本,其中有且仅有一个作为 Leader,Leader 是当前负责消息读写 的 Partition。即所有读写操作只能发生于 Leader 分区上。

Partition Follower:所有 Follower 都需要从 Leader 同步消息,Follower 与 Leader 始终保持消息同步。Leader 与 Follower 的关系是主备关系,而非主从关系。

ISR:

  • ISR,In-Sync Replicas,是指副本同步列表。ISR 列表是由 Leader 负责维护。
  • AR,Assigned Replicas,指某个 Partition 的所有副本, 即已分配的副本列表。
  • OSR,Outof-Sync Replicas,即非同步的副本列表。
  • AR=ISR+OSR

Offset:偏移量。每条消息都有一个当前 Partition 下唯一的 64 字节的 Offset,它是相当于当前分区第一条消息的偏移量。

Broker Controller:Kafka集群的多个 Broker 中,有一个会被选举 Controller,负责管理整个集群中 Partition 和 Replicas 的状态。

只有 Broker Controller 会向 Zookeeper 中注册 Watcher,其他 Broker 及分区无需注册。即 Zookeeper 仅需监听 Broker Controller 的状态变化即可。

HW 与 LEO:

  • HW,HighWatermark,高水位,表示 Consumer 可以消费到的最高 Partition 偏移量。HW 保证了 Kafka 集群中消息的一致性。确切地说,是保证了 Partition 的 Follower 与 Leader 间数 据的一致性。
  • LEO,Log End Offset,日志最后消息的偏移量。消息是被写入到 Kafka 的日志文件中的, 这是当前最后一个写入的消息在 Partition 中的偏移量。
  • 对于 Leader 新写入的消息,Consumer 是不能立刻消费的。Leader 会等待该消息被所有 ISR 中的 Partition Follower 同步后才会更新 HW,此时消息才能被 Consumer 消费。

图来形象话的表示HW 与 LEO两者的关系

Zookeeper:Zookeeper 负责维护和协调 Broker,负责 Broker Controller 的选举。在 Kafka 0.9 之前版本,Offset 是由 ZK 负责管理的。

总结:ZK 负责 Controller 的选举,Controller 负责 Leader 的选举。

Coordinator:一般指的是运行在每个 Broker 上的 Group Coordinator 进程,用于管理 Consumer Group 中的各个成员,主要用于 Offset 位移管理和 Rebalance。一个 Coordinator 可以同时管理多个消费者组。

Rebalance:当消费者组中的数量发生变化,或者 Topic 中的 Partition 数量发生了变化时,Partition 的所有权会在消费者间转移,即 Partition 会重新分配,这个过程称为再均衡 Rebalance。

再均衡能够给消费者组及 Broker 带来高性能、高可用性和伸缩,但在再均衡期间消费者是无法读取消息的,即整个 Broker 集群有小一段时间是不可用的。因此要避免不必要的再均衡。

Offset Commit:Consumer 从 Broker 中取一批消息写入 Buffer 进行消费,在规定的时间内消费完消息后,会自动将其消费消息的 Offset 提交给 Broker,以记录下哪些消息是消费过的。当然,若在时限内没有消费完毕,其是不会提交 Offset 的。

Kafka的工作原理和过程

①消息写入算法

消息发送者将消息发送给 Broker, 并形成最终的可供消费者消费的 log,是一个比较复杂的过程:

  • Producer 先从 Zookeeper 中找到该 Partition 的 Leader。
  • Producer将消息发送给该 Leader。
  • Leader 将消息接入本地的 log,并通知 ISR 的 Followers。
  • ISR 中的 Followers 从 Leader 中 Pull 消息, 写入本地 log 后向 Leader 发送 Ack。
  • Leader 收到所有 ISR 中的 Followers 的 Ack 后,增加 HW 并向 Producer 发送 Ack,表示消息写入成功。

②消息路由策略

在通过 API 方式发布消息时,生产者是以 Record 为消息进行发布的。

Record 中包含 Key 与 Value,Value 才是真正的消息本身,而 Key 用于路由消息所要存放的 Partition。

消息要写入到哪个 Partition 并不是随机的,而是有路由策略的:

  • 若指定了 Partition,则直接写入到指定的 Partition。
  • 若未指定 Partition 但指定了 Key,则通过对 Key 的 Hash 值与 Partition 数量取模,该取模。
  • 结果就是要选出的 Partition 索引。
  • 若 Partition 和 Key 都未指定,则使用轮询算法选出一个 Partition。

③HW 截断机制

如果 Partition Leader 接收到了新的消息, ISR 中其它 Follower 正在同步过程中,还未同步完毕时 leader 宕机。

此时就需要选举出新的 Leader。若没有 HW 截断机制,将会导致 Partition 中 Leader 与 Follower 数据的不一致。

当原 Leader 宕机后又恢复时,将其 LEO 回退到其宕机时的 HW,然后再与新的 Leader 进行数据同步,这样就可以保证老 Leader 与新 Leader 中数据一致了,这种机制称为 HW 截断机制。

④消息发送的可靠性

生产者向 Kafka 发送消息时,可以选择需要的可靠性级别。通过 request.required.acks 参数的值进行设置。

0 值:异步发送。生产者向 Kafka 发送消息而不需要 Kafka 反馈成功 Ack。该方式效率最高,但可靠性最低。

其可能会存在消息丢失的情况:

  • 在传输过程中会出现消息丢失。
  • 在 Broker 内部会出现消息丢失。
  • 会出现写入到 Kafka 中的消息的顺序与生产顺序不一致的情况。

1 值:同步发送。生产者发送消息给 Kafka,Broker 的 Partition Leader 在收到消息后马上发送成功 Ack(无需等等 ISR 中的 Follower 同步)。

生产者收到后知道消息发送成功,然后会再发送消息。如果一直未收到 Kafka 的 Ack,则生产者会认为消息发送失败,会重发消息。

该方式对于 Producer 来说,若没有收到 Ack,一定可以确认消息发送失败了,然后可以重发。

但是,即使收到了 ACK,也不能保证消息一定就发送成功了。故,这种情况,也可能会发生消息丢失的情况。

-1 值:同步发送。生产者发送消息给 Kafka,Kafka 收到消息后要等到 ISR 列表中的所有副本都 同步消息完成后,才向生产者发送成功 Ack。

如果一直未收到 Kafka 的 Ack,则认为消息发送 失败,会自动重发消息。该方式会出现消息重复接收的情况。

⑤消费者消费过程解析

生产者将消息发送到 Topitc 中,消费者即可对其进行消费,其消费过程如下:

  • Consumer 向 Broker 提交连接请求,其所连接上的 Broker 都会向其发送Broker Controller 的通信 URL,即配置文件中的 Listeners 地址。
  • 当 Consumer 指定了要消费的 Topic 后,会向 Broker Controller 发送消费请求。
  • Broker Controller 会为 Consumer 分配一个或几个 Partition Leader,并将该 Partition 的当前 Offset 发送给 Consumer。
  • Consumer 会按照 Broker Controller 分配的 Partition 对其中的消息进行消费。
  • 当 Consumer 消费完该条消息后,Consumer 会向 Broker 发送一个消息已经被消费反馈,即该消息的 Offset。
  • 在 Broker 接收到 Consumer 的 Offset 后,会更新相应的 __consumer_offset 中。
  • 以上过程会一直重复,知道消费者停止请求消费。
  • Consumer 可以重置 Offset,从而可以灵活消费存储在 Broker 上的消息。

⑥Partition Leader 选举范围

当 Leader 宕机后,Broker Controller 会从 ISR 中挑选一个 Follower 成为新的 Leader。

如果 ISR 中没有其他副本怎么办?可以通过 unclean.leader.election.enable 的值来设置 Leader 选举范围。

False:必须等到 ISR 列表中所有的副本都活过来才进行新的选举。该策略可靠性有保证,但可用性低。

True:在 ISR 列表中没有副本的情况下,可以选择任意一个没有宕机的主机作为新的 Leader,该策略可用性高,但可靠性没有保证。

⑦重复消费问题的解决方案

同一个 Consumer 重复消费:当 Consumer 由于消费能力低而引发了消费超时,则可能会形成重复消费。

在某数据刚好消费完毕,但是正准备提交 Offset 时候,消费时间超时,则 Broker 认为这条消息未消费成功。这时就会产生重复消费问题。其解决方案:延长 Offset 提交时间。

不同的 Consumer 重复消费:当 Consumer 消费了消息,但还没有提交 Offset 时宕机,则这些已经被消费过的消息会被重复消费。其解决方案:将自动提交改为手动提交。

⑧从架构设计上解决 Kafka 重复消费的问题

在设计程序的时候,比如考虑到网络故障等一些异常的情况,都会设置消息的重试次数,可能还有其他可能出现消息重复,那应该如何解决呢?下面提供三个方案:

方案一:保存并查询

给每个消息都设置一个独一无二的 uuid,所有的消息,都要存一个 uuid。

在消费消息的时候,首先去持久化系统中查询一下看这个看是否以前消费过,如没有消费过,在进行消费,如果已经消费过,丢弃就好了。

下图表明了这种方案:

方案二:利用幂等

幂等(Idempotence)在数学上是这样定义的,如果一个函数 f(x) 满足:f(f(x)) = f(x),则函数 f(x) 满足幂等性。

这个概念被拓展到计算机领域,被用来描述一个操作、方法或者服务。一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。

一个幂等的方法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。所以,对于幂等的方法,不用担心重复执行会对系统造成任何改变。

举个例子来说明一下。在不考虑并发的情况下,“将 X 老师的账户余额设置为 100 万元”,执行一次后对系统的影响是,X 老师的账户余额变成了 100 万元。

只要提供的参数 100 万元不变,那即使再执行多少次,X 老师的账户余额始终都是 100 万元,不会变化,这个操作就是一个幂等的操作。

再举一个例子,“将 X 老师的余额加 100 万元”,这个操作它就不是幂等的,每执行一次,账户余额就会增加 100 万元,执行多次和执行一次对系统的影响(也就是账户的余额)是不一样的。

所以,通过这两个例子,可以想到如果系统消费消息的业务逻辑具备幂等性,那就不用担心消息重复的问题了,因为同一条消息,消费一次和消费多次对系统的影响是完全一样的。也就可以认为,消费多次等于消费一次。

那么,如何实现幂等操作呢?最好的方式就是,从业务逻辑设计上入手,将消费的业务逻辑设计成具备幂等性的操作。

但是,不是所有的业务都能设计成天然幂等的,这里就需要一些方法和技巧来实现幂等。

下面介绍一种常用的方法:利用数据库的唯一约束实现幂等。

例如,刚刚提到的那个不具备幂等特性的转账的例子:将 X 老师的账户余额加 100 万元。在这个例子中,可以通过改造业务逻辑,让它具备幂等性。

首先,可以限定,对于每个转账单每个账户只可以执行一次变更操作,在分布式系统中,这个限制实现的方法非常多,最简单的是在数据库中建一张转账流水表。

这个表有三个字段:转账单 ID、账户 ID 和变更金额,然后给转账单 ID 和账户 ID 这两个字段联合起来创建一个唯一约束,这样对于相同的转账单 ID 和账户 ID,表里至多只能存在一条记录。

这样,消费消息的逻辑可以变为:“在转账流水表中增加一条转账记录,然后再根据转账记录,异步操作更新用户余额即可。”

在转账流水表增加一条转账记录这个操作中,由于在这个表中预先定义了“账户 ID 转账单 ID”的唯一约束,对于同一个转账单同一个账户只能插入一条记录,后续重复的插入操作都会失败,这样就实现了一个幂等的操作。

方案三:设置前提条件

为更新的数据设置前置条件另外一种实现幂等的思路是,给数据变更设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。

这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。

比如,刚刚说过,“将 X 老师的账户的余额增加 100 万元”这个操作并不满足幂等性,可以把这个操作加上一个前置条件,变为:“如果 X 老师的账户当前的余额为 500 万元,将余额加 100 万元”,这个操作就具备了幂等性。

对应到消息队列中的使用时,可以在发消息时在消息体中带上当前的余额,在消费的时候进行判断数据库中,当前余额是否与消息中的余额相等,只有相等才执行变更操作。

但是,如果要更新的数据不是数值,或者要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?

更加通用的方法是,给你的数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等。